Skip to content

fix(core): make appendcol row ordering deterministic on parallel engines#5474

Merged
ahkcs merged 1 commit into
opensearch-project:mainfrom
ahkcs:fix/appendcol-deterministic-ordering
May 28, 2026
Merged

fix(core): make appendcol row ordering deterministic on parallel engines#5474
ahkcs merged 1 commit into
opensearch-project:mainfrom
ahkcs:fix/appendcol-deterministic-ordering

Conversation

@ahkcs

@ahkcs ahkcs commented May 27, 2026

Copy link
Copy Markdown
Collaborator

What's broken

appendcol pairs the wrong rows together on parallel engines (analytics engine via DataFusion, Spark pushdown), and rows come out in scrambled order.

Internally appendcol lowers to:

main      → ROW_NUMBER() OVER ()
subsearch → ROW_NUMBER() OVER ()
FULL JOIN on (main_rownum = subsearch_rownum)
(no trailing sort)

That only works on a single-threaded engine. On a parallel one, row numbers are assigned arbitrarily and the join then glues the wrong subsearch row onto the wrong main row.

Root cause, step by step (using the IT data)

Step 1 — what appendcol does mechanically

testAppendCol query:

source=account | stats sum(age) as sum by gender, state | sort gender, state
  | appendcol [ stats count(age) as cnt by gender | sort gender ]
  | fields gender, state, sum, cnt | head 10

Main (after stats sum(age) by gender, state | sort gender, state) — many rows, one per (gender, state). F rows come first alphabetically, then M rows:

F/AK   sum=317
F/AL   sum=397
F/AR   sum=229
F/AZ   sum=238
…
M/AK   sum=…
M/MD   sum=580
…

Subsearch (stats count(age) by gender | sort gender) — exactly 2 rows:

F   cnt=493     ← count of all F docs
M   cnt=507     ← count of all M docs

appendcol (1) numbers each main row, (2) numbers each subsearch row, (3) FULL JOINs on those row numbers.

Step 2 — on v2 (serial), row numbers follow input order

Main:    _rn_main_       Sub:        _rn_sub_
F/AK     1               F  cnt=493  1
F/AL     2               M  cnt=507  2
F/AR     3
…

Join on _rn_main_ = _rn_sub_:

Main paired with Result
F/AK (1) F, 493 (1) F/AK gets cnt=493
F/AL (2) M, 507 (2) F/AL gets cnt=507 ✅ (M's count lands on an F row — that's the positional zip)
F/AR (3)+ nothing cnt=null ✅

After head 10: F/AK, F/AL, F/AR, F/AZ, F/CA, F/CO, F/CT, F/DC, F/DE, F/FL — exactly what the IT asserts.

Step 3 — on the analytics route (DataFusion, parallel), row numbers are arbitrary

ROW_NUMBER() OVER () ignores input order on a parallel engine. Example random assignment:

Main:    _rn_main_         Sub:        _rn_sub_
F/AK     13   ← arbitrary  F  cnt=493  2   ← arbitrary
F/AL     5                 M  cnt=507  1
F/AR     28
…
M/MD     1    ← happened to get 1

Same FULL JOIN:

Main paired with Result
M/MD (1) M, 507 (1) M/MD gets cnt=507 (coincidentally self-consistent)
some main row (2) F, 493 (2) some random main row gets cnt=493
F/AK (13) nothing F/AK gets cnt=null ❌ (should be 493)
F/AL (5) nothing F/AL gets cnt=null ❌ (should be 507)
nothing null

This matches the actual IT failure:

Expected: [F,AK,317,493], [F,AL,397,507], [F,AR,229,null], …, [F,FL,310,null]
Actual contained: ["M","MD",580,507]    ← M row leaked into top-10

M/MD leaked into the top 10 (sort order was lost too), and the cnt values landed on the wrong main rows entirely.

Step 4 — a downstream sort can't fix this

What if we add | sort gender, state at the end to "fix the order"?

F/AK, sum=317, cnt=null     ← still wrong, IT wants 493
F/AL, sum=397, cnt=null     ← still wrong, IT wants 507
F/AR, sum=229, cnt=null
…
M/MD, sum=580, cnt=507      ← the wrong-attached cnt is still attached

A sort only reorders tuples; it can't pry one open and swap cnt values between rows. The exact tuple [F, AK, 317, 493] the IT asserts doesn't exist anywhere in the broken output — no row anywhere has F/AK paired with 493. No verifyDataRowsInAnyOrder or trailing sort can recover that.

Step 5 — the fix, applied to the same scenario

The wrong pairing is committed inside visitAppendCol (Step 3). Once those tuples exist, no downstream PPL command can undo them — so the lowering itself must produce deterministic row numbers. Two changes do that:

(a) Fill the window's OVER (???) with the upstream sort's collation:

Main:      ROW_NUMBER() OVER (ORDER BY gender, state)
Subsearch: ROW_NUMBER() OVER (ORDER BY gender)

DataFusion is now obliged to sort by those keys before numbering, so it assigns the same numbers v2 would:

Main:    _rn_main_       Sub:        _rn_sub_
F/AK     1               F  cnt=493  1
F/AL     2               M  cnt=507  2
F/AR     3
F/AZ     4
…

The FULL JOIN on row numbers now pairs correctly: F/AK ↔ F(493), F/AL ↔ M(507), rest null. The tuples are right[F, AK, 317, 493] and [F, AL, 397, 507] exist in memory.

(b) Add a trailing sort by _rn_main_, _rn_sub_ (NULLS LAST) after the join:

DataFusion's hash join can still scramble output order even when the individual tuples are correct. The trailing sort puts them back in row-number order, which (by Piece a) is the upstream sort order:

F/AK, sum=317, cnt=493     ← _rn_main_ = 1
F/AL, sum=397, cnt=507     ← _rn_main_ = 2
F/AR, sum=229, cnt=null    ← _rn_main_ = 3
F/AZ, sum=238, cnt=null    ← _rn_main_ = 4
…

After head 10: the same 10 F-rows v2 produced (F/AK … F/FL), with the right cnt on each. The IT now passes 2/2 on the analytics route.

Both pieces are required — (a) alone leaves the post-join output scrambled by the hash join; (b) alone sorts by row numbers that are themselves random.

Where the ROW_NUMBER() OVER (...) is emitted

The window expression is built inside visitAppendCol via a helper PlanUtils.makeOver:

RexNode mainRowNumber = PlanUtils.makeOver(
    context,
    BuiltinFunctionName.ROW_NUMBER,
    null,
    List.of(),
    List.of(),                       // partition keys (empty)
    mainOrderKeys,                   // ← the ORDER BY list — this is what we change
    WindowFrame.toCurrentRow());

context.relBuilder.projectPlus(
    context.relBuilder.alias(mainRowNumber, ROW_NUMBER_COLUMN_FOR_MAIN));

End-to-end path of the offending construct:

PPL "| appendcol [...]"
   ↓  parser → AST AppendCol
visitAppendCol → PlanUtils.makeOver(... orderKeys=??? ...)
   ↓  produces a RexOver { ROW_NUMBER, OVER(orderKeys=???) }
LogicalProject(..., _row_number_main_=[ROW_NUMBER() OVER (???)])
   ↓  Calcite → Substrait
DataFusion's window operator runs it

Before the fix, mainOrderKeys was hard-coded to List.of(), so DataFusion saw bare OVER () and assigned numbers arbitrarily.

Fix

Two changes in visitAppendCol:

  1. Fill the ???: change mainOrderKeys from hard-coded List.of() to deriveCollationOrderKeys(context), which reads the upstream collation via RelMetadataQuery.collations(peek()). So OVER () becomes OVER (ORDER BY <upstream sort>) — deterministic on DataFusion.
  2. Trailing sort by the row-number columns after the FULL JOIN, so the final output order doesn't depend on join execution (same pattern streamstats already uses).

No behavior change on v2/Calcite (its serial execution already produced this order).

Results

CalcitePPLAppendcolIT:

Analytics engine v2/Calcite
Before 0/2 2/2
After 2/2 2/2

Testing

  • CalcitePPLAppendcolTest (5 unit tests, regenerated plans) ✅
  • CalcitePPLAppendcolIT 2/2 on both paths ✅
  • NewAddedCommandsIT.testAppendcol
  • appendcol.md doctest ✅ (no doc edits needed)
  • spotlessCheck clean ✅

Check List

  • New functionality includes testing.
  • New functionality has been documented (n/a — behavior-preserving fix).
  • Commits are signed per the DCO using --signoff.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

@github-actions

github-actions Bot commented May 27, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 41ef5f7)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Empty collation fallback

When deriveCollationOrderKeys returns an empty list (no collation available), ROW_NUMBER() OVER (ORDER BY <empty>) is generated. This is semantically equivalent to the old ROW_NUMBER() OVER () and still non-deterministic on parallel engines. The fix only works if a collation is present; queries without one remain broken.

private static List<RexNode> deriveCollationOrderKeys(CalcitePlanContext context) {
  RelBuilder relBuilder = context.relBuilder;
  List<RelCollation> collations =
      relBuilder.getCluster().getMetadataQuery().collations(relBuilder.peek());
  if (collations == null || collations.isEmpty()) {
    return List.of();
  }
  List<RexNode> orderKeys = new ArrayList<>();
  for (RelFieldCollation fieldCollation : collations.get(0).getFieldCollations()) {
    RexNode key = relBuilder.field(fieldCollation.getFieldIndex());
    if (fieldCollation.direction.isDescending()) {
      key = relBuilder.desc(key);
    }
    if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.LAST) {
      key = relBuilder.nullsLast(key);
    } else if (fieldCollation.nullDirection == RelFieldCollation.NullDirection.FIRST) {
      key = relBuilder.nullsFirst(key);
    }
    orderKeys.add(key);
  }
  return orderKeys;
}

@ahkcs ahkcs added PPL Piped processing language enhancement New feature or request labels May 27, 2026
appendcol lowers to a FULL JOIN of two ROW_NUMBER() OVER () windows
(empty PARTITION BY / ORDER BY) on _row_number_main_ = _row_number_subsearch_,
with no trailing sort. That positional zip is only correct on a serial,
order-preserving executor: a bare ROW_NUMBER() OVER () assigns sequence
numbers in input order and the join preserves it. On a parallel/distributed
backend the row-number assignment is arbitrary and the hash join drops
ordering, so columns get zipped onto the wrong rows and downstream `head`
slices a non-deterministic subset.

Fix visitAppendCol to not depend on implicit input-order preservation:
- derive an explicit window ORDER BY from each child's collation
  (deriveCollationOrderKeys), so ROW_NUMBER assignment follows the upstream
  sort; falls back to the prior bare OVER () when the input has no collation
  (positional correspondence is undefined without a sort).
- add a trailing sort by the row-number columns after the join (NULLS LAST,
  same pattern as streamstats) so output order is deterministic regardless of
  how the backend executes the join.

No behavior change on the serial v2/Calcite engine; makes the lowering correct
on parallel backends. Updates CalcitePPLAppendcolTest expected plans/SparkSQL.

Signed-off-by: Kai Huang <ahkcs@amazon.com>
@ahkcs ahkcs force-pushed the fix/appendcol-deterministic-ordering branch from 094e05e to 41ef5f7 Compare May 27, 2026 22:11
@github-actions

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 41ef5f7

+ " `t`.`COMM`, `t`.`DEPTNO`\n"
+ "FROM (SELECT `EMPNO`, `ENAME`, `JOB`, `MGR`, `HIREDATE`, `SAL`, `COMM`, `DEPTNO`,"
+ " ROW_NUMBER() OVER () `_row_number_main_`\n"
+ " ROW_NUMBER() OVER (ORDER BY `EMPNO` NULLS LAST) `_row_number_main_`\n"

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ORDER BY EMPNO?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our helper calls getMetadataQuery().collations(peek()), which for LogicalTableScan(scott.EMP) returns the table's declared [EMPNO ASC NULLS LAST] (SCOTT models EMPNO as the Primary Key), and we thread that into the window's OVER (...), OVER (ORDER BY <known input order>) forces deterministic ROW_NUMBER assignment

@ahkcs ahkcs requested a review from penghuo May 28, 2026 18:39
@ahkcs ahkcs merged commit 9300574 into opensearch-project:main May 28, 2026
46 checks passed
@ahkcs ahkcs deleted the fix/appendcol-deterministic-ordering branch May 28, 2026 22:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request PPL Piped processing language

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants